# Class Imbalance

Class imbalance arises when the number of classes in the data set is imbalanced. 

In [None]:
%matplotlib inline
%load_ext autoreload
%autoreload 2
import ipywidgets
from ipywidgets import interact, interactive, interact_manual
import IPython
from matplotlib import rcParams
rcParams['figure.figsize'] = (16, 8)
rcParams['font.size'] = 16

import numpy as np
import matplotlib.pyplot as plt
from utilities.load_data import linear_separable_data, circular_separable_data
from utilities import plot_helpers 
from sklearn.svm import LinearSVC
from sklearn.linear_model import SGDClassifier
import sklearn.metrics as metrics
# from sklearn.metrics import plot_roc_curve
# impofrt sklearn.warnings ConvergenceWarning as ConvergenceWarning

import warnings
# warnings.simplefilter(action='ignore', category=FutureWarning)
warnings.filterwarnings(action='ignore')

## Define usual Metrics to evaluate classifiers

In [None]:
def build_confusion_matrix(pred_label, true_label, num_classes=2):
 """This works for predictions in {0, 1, ..., Num Classes}."""
 confusion_matrix = np.zeros((num_classes, num_classes))
 for row in range(num_classes):
 for col in range(num_classes):
 confusion_matrix[row, col] = np.sum(np.bitwise_and(pred_label == row, true_label == col))
 return confusion_matrix

def accuracy(pred_label, true_label):
 # Option 1, works for any prediction output. 
 total = len(true_label)
 correct = np.sum(pred_label == true_label)
 acc = correct / total
 
 # Option 2, works for prediction in {-1, 1}. 
 confusion_matrix = build_confusion_matrix((pred_label + 1) // 2, (true_label + 1) // 2)
 acc2 = np.sum(np.diag(confusion_matrix)) / np.sum(confusion_matrix)
 
 assert acc == acc2
 return acc
 

def precision(pred_label, true_label):
 # Option 1, works for prediction in {-1, 1}. 
 true_positive = np.sum(np.bitwise_and(true_label == 1, pred_label == 1))
 false_positive = np.sum(np.bitwise_and(true_label == -1, pred_label == 1))
 
 total_pred_positive = true_positive + false_positive
 assert total_pred_positive == np.sum(pred_label == 1)
 if total_pred_positive == 0:
 return 0
 
 prec1 = true_positive / total_pred_positive
 
 # Option 2, works for prediction in {-1, 1}. 
 confusion_matrix = build_confusion_matrix((pred_label + 1) // 2, (true_label + 1) // 2)
 true_positive = confusion_matrix[1, 1]
 total_pred_positive = np.sum(confusion_matrix[1])
 prec2 = true_positive / total_pred_positive

 assert prec1 == prec2 
 return prec1


def recall(pred_label, true_label):
 # Option 1, works for prediction in {-1, 1}. 
 true_positive = np.sum(np.bitwise_and(true_label == 1, pred_label == 1))
 false_negative = np.sum(np.bitwise_and(true_label == 1, pred_label == -1))
 
 total_true_positive = true_positive + false_negative
 assert total_true_positive == np.sum(true_label == 1)
 if total_true_positive == 0:
 return 0
 
 rec1 = true_positive / total_true_positive
 
 # Option 2, works for prediction in {-1, 1}. 
 confusion_matrix = build_confusion_matrix((pred_label + 1) // 2, (true_label + 1) // 2)
 true_positive = confusion_matrix[1, 1]
 total_true_positive = np.sum(confusion_matrix[:, 1])
 rec2 = true_positive / total_true_positive

 assert rec1 == rec2 
 return rec1

def f1_score(pred_label, true_label):
 precision_ = precision(pred_label, true_label)
 recall_ = recall(pred_label, true_label)
 if precision_ == 0 or recall_ == 0:
 return 0 
 else:
 return 2 / (1 / precision_ + 1 / recall_)
 
def make_meshgrid(x, y, h=.02):
 x_min, x_max = -2, 3
 y_min, y_max = -2, 3
 xx, yy = np.meshgrid(np.arange(x_min, x_max, h), np.arange(y_min, y_max, h))
 return xx, yy
 
def plot_imbalanced(X, Y, classifier=None, Xtest=None, Ytest=None, fig=None):
 if Xtest is None:
 if fig is None:
 fig = plt.subplot(111)
 opt = {'marker': 'r*', 'label': '+'}
 plot_helpers.plot_data(X[np.where(Y == 1)[0], 0], X[np.where(Y == 1)[0], 1], fig=fig, options=opt)
 opt = {'marker': 'bs', 'label': '-'}
 plot_helpers.plot_data(X[np.where(Y == -1)[0], 0], X[np.where(Y == -1)[0], 1], fig=fig, options=opt)

 if classifier is not None:
 xx, yy = make_meshgrid(X[:, 0], X[:, 1])
 Z = classifier.predict(np.c_[xx.ravel(), yy.ravel()])
 Z = Z.reshape(xx.shape)
 out = plt.contourf(xx, yy, Z, colors=['blue', 'red'], alpha=0.3)

 plt.xlim([-2, 3])
 plt.ylim([-2, 3])
 
 else:
 fig, ax = plt.subplots(1, 2)
 plt.sca(ax[0])
 plot_imbalanced(X, Y, classifier, fig=ax[0])
 plt.title('Train Data')
 
 plt.sca(ax[1])
 plot_imbalanced(Xtest, Ytest, classifier, fig=ax[1])
 plt.title('Test Data')


def print_metrics(pred_label, true_label, pred_score):
 acc = accuracy(pred_label, true_label)
 prec = precision(pred_label, true_label)
 rec = recall(pred_label, true_label)
 f1 = f1_score(pred_label, true_label)
 confusion_matrix = build_confusion_matrix((pred_label + 1) // 2, (true_label + 1) // 2)
 
 fpr, tpr, thresholds = metrics.roc_curve(true_label, pred_score)
 auc_ = metrics.auc(fpr, tpr)
 
 print('Accuracy: {:.2f}. Precision: {:.2f}. Recall: {:.2f}. F1-Score: {:.2f}. AUC: {:.2f}'.format(acc, prec, rec, f1, auc_))
 print('Confusion Matrix: \n', confusion_matrix)
 
 

In [None]:
def generate_data(num_positive, num_negative, noise):
 X, Y = linear_separable_data(num_positive, num_negative, noise=noise, dim=2)
 train_idx = np.random.choice(total, int(0.8 * total), replace=False)
 test_idx = [i for i in range(total) if i not in train_idx]
 Xtrain, Ytrain = X[train_idx], Y[train_idx]
 Xtest, Ytest = X[test_idx], Y[test_idx]
 
 return X, Y, Xtrain, Ytrain, Xtest, Ytest

num_positive = 10 
num_negative = 100
total = num_positive + num_negative
noise = 0.5
np.random.seed(0)
X, Y, Xtrain, Ytrain, Xtest, Ytest = generate_data(num_positive, num_negative, noise)
plot_imbalanced(X, Y)

## Option 0: Vanilla Classifier

In [None]:
def vanilla(X, Y, classifier, Xtest=None, Ytest=None):
 classifier.fit(X[:, :2], Y)
 plot_imbalanced(X, Y, classifier, Xtest, Ytest)
 return classifier

classifier = vanilla(Xtrain, Ytrain, LinearSVC(), Xtest, Ytest)
print('Train')
Ypred = classifier.predict(Xtrain[:, :2])
Spred = classifier.decision_function(Xtrain[:, :2])
print_metrics(Ypred, Ytrain, Spred)


print('Test')
Ypred = classifier.predict(Xtest[:, :2])
Spred = classifier.decision_function(Xtest[:, :2])
print_metrics(Ypred, Ytest, Spred)

## Option 1: Downsampling majority class

In [None]:
def downsampling(X, Y, classifier, Xtest=None, Ytest=None):
 pos_idx = np.where(Y == 1)[0]
 neg_idx = np.where(Y == -1)[0]
 total_positives = len(pos_idx)
 total_negatives = len(neg_idx)

 down_idx = np.random.choice(np.arange(total_negatives), total_positives, replace=False)
 down_train_idx = np.concatenate((pos_idx, neg_idx[down_idx]))

 X_down, Y_down = X[down_train_idx], Y[down_train_idx]
 
 assert len(down_idx) == total_positives
 classifier.fit(X_down[:, :2], Y_down) # Use only the first two features as the classifier fits a bias term.
 
 plot_imbalanced(X_down, Y_down, classifier, Xtest, Ytest)
 return classifier 

classifier = downsampling(Xtrain, Ytrain, LinearSVC(), Xtest, Ytest)
print('Train')
Ypred = classifier.predict(Xtrain[:, :2])
Spred = classifier.decision_function(Xtrain[:, :2])
print_metrics(Ypred, Ytrain, Spred)


print('Test')
Ypred = classifier.predict(Xtest[:, :2])
Spred = classifier.decision_function(Xtest[:, :2])
print_metrics(Ypred, Ytest, Spred)



## Option 2: Upsampling minority class

In [None]:
def upsampling(X, Y, classifier, up_sampling_noise=1e-1, Xtest=None, Ytest=None):
 pos_idx = np.where(Y == 1)[0]
 neg_idx = np.where(Y == -1)[0]
 total_positives = len(pos_idx)
 total_negatives = len(neg_idx)

 up_idx = np.random.choice(np.arange(total_positives), total_negatives, replace=True)
 up_train_idx = np.concatenate((pos_idx[up_idx], neg_idx))
 assert len(up_idx) == total_negatives

 X_up, Y_up = X[up_train_idx], Y[up_train_idx]
 X_up[:total_negatives, :2] += up_sampling_noise * np.random.randn(total_negatives, 2) # perturb the up-sampling

 classifier.fit(X_up[:, :2], Y_up) # Use only the first two features as the classifier fits a bias term.
 
 plot_imbalanced(X_up, Y_up, classifier, Xtest, Ytest)
 return classifier 

up_sampling_noise=1e-1
classifier = upsampling(Xtrain, Ytrain, LinearSVC(), up_sampling_noise, Xtest, Ytest)
print('Train')
Ypred = classifier.predict(Xtrain[:, :2])
Spred = classifier.decision_function(Xtrain[:, :2])
print_metrics(Ypred, Ytrain, Spred)


print('Test')
Ypred = classifier.predict(Xtest[:, :2])
Spred = classifier.decision_function(Xtest[:, :2])
print_metrics(Ypred, Ytest, Spred)

## Option 3: Cost-Sensitive Classification

In [None]:
def cost_sensitive(X, Y, classifier, class_ratio, Xtest=None, Ytest=None):
 classifier.class_weight = {-1: 1, 1: class_ratio}
 
 classifier.fit(X[:, :2], Y)
 plot_imbalanced(X, Y, classifier, Xtest, Ytest)
 return classifier

class_ratio = num_negative / num_positive
classifier = cost_sensitive(Xtrain, Ytrain, LinearSVC(), class_ratio, Xtest, Ytest)

print('Train')
Ypred = classifier.predict(Xtrain[:, :2])
Spred = classifier.decision_function(Xtrain[:, :2])
print_metrics(Ypred, Ytrain, Spred)


print('Test')
Ypred = classifier.predict(Xtest[:, :2])
Spred = classifier.decision_function(Xtest[:, :2])
print_metrics(Ypred, Ytest, Spred)



# Compare All Methods

In [None]:
up_sampling_noise = 1e-1

def imbalanced_learn(method, noise, classifier, weight_ratio):
 np.random.seed(0)
 X, Y, Xtrain, Ytrain, Xtest, Ytest = generate_data(num_positive=20, num_negative=200, noise=noise)

 if classifier.lower() == 'svm':
 classifier = LinearSVC()
 elif classifier.lower() == 'perceptron':
 classifier = SGDClassifier(loss='perceptron', random_state=1, max_iter=1000)
 else:
 raise ValueError('Not Implemented classifer.')
 
 if method.lower() == 'vanilla':
 classifier = vanilla(Xtrain, Ytrain, classifier, Xtest, Ytest)
 elif method.lower() == 'downsampling':
 classifier = downsampling(Xtrain, Ytrain, classifier, Xtest, Ytest)
 elif method.lower() == 'upsampling': 
 classifier = upsampling(Xtrain, Ytrain, classifier, up_sampling_noise, Xtest, Ytest)
 elif method.lower() == 'cost-sensitive':
 classifier = cost_sensitive(Xtrain, Ytrain, classifier, weight_ratio, Xtest, Ytest)
 
 
 print('Train')
 Ypred = classifier.predict(Xtrain[:, :2])
 Spred = classifier.decision_function(Xtrain[:, :2])
 print_metrics(Ypred, Ytrain, Spred)

 
 print('Test')
 Ypred = classifier.predict(Xtest[:, :2])
 Spred = classifier.decision_function(Xtest[:, :2])
 print_metrics(Ypred, Ytest, Spred)
 

noise_widget = ipywidgets.FloatSlider(value=0.6, min=0, max=1, step=0.1, continuous_update=False)
weight_ratio_widget = ipywidgets.FloatLogSlider(value=10, min=-2, max=3, continuous_update=False)
interact(imbalanced_learn, method=['vanilla', 'downsampling', 'upsampling', 'cost-sensitive'], 
 noise=noise_widget, classifier=['perceptron', 'svm'], weight_ratio=weight_ratio_widget);
 